


----------------------------------------------------------------
--- Copyright：    迷你玩-海马
--- Description：  作为客户端时，用来请求data_cache的封装
---
--- 提供的函数：
---		read    读 (可选是否远程加锁)
---		save    写 (最大保留时间 et=expire_time=30天 )
---		del     删除 (也可以save里设置et=0.001,立即失效)
---
---		mread   批量读取
---
---     unlock_key_hash 远程解锁
---
----------------------------------------------------------------



-- ( 连接 DC data_cache shm服务器专用文件 ) (  XServer[shmDcClient] --> DcSever )

local assert     = assert
local require    = require
local bit        = bit
local coroutine  = coroutine
local debug      = debug
local io         = io
local pairs      = pairs
local ipairs     = ipairs
local math       = math
local os         = os
local print      = print
local pcall      = pcall
local xpcall     = xpcall
local rawget     = rawget
local rawset     = rawset
local select     = select
local string     = string
local table      = table
local tonumber   = tonumber
local tostring	 = tostring
local error      = error
local type       = type
local unpack     = unpack
local setmetatable = setmetatable
local getmetatable = getmetatable
local ngx = ngx

require "resty.core.shdict"

local ngx_shared  = ngx.shared;
local string_sub  = string.sub
local string_gsub = string.gsub
local string_format = string.format

local log        = require("modules.commhm.log");
local ns_time    = require("modules.commhm.time");
local uu         = require("modules.commhm.utils");
local ns_network = require("modules.commhm.network");

local ns_centerServerProxy   = require("modules.commhm.centerServerProxy");

local table_insert = table.insert
local ngx_sleep = ngx.sleep
local ngx_now   = ngx.now


-------------------------------------------------------------------------------
--  __READ_ONLY__  只读   不用锁
-------------------------------------------------------------------------------


-- shm MGR类
local _M = { _VERSION = '0.12.1' }
local mt = { __index = _M }


function _M.new(self, module_name_, config_server_, config_select_ )
	log.debug( log.fast() and "call shmDcClient:new");
	local ins_ = {
		config_cache  = {};
		module_name   = module_name_;

		config_server = config_server_;
		config_select = config_select_;

		carry_index = ngx.worker.id() * 10000 + 10000;
	};

	-- 异步上报config，失败不影响使用
	local config_str_ = table.tostring( { server=config_server_, select=config_select_ } )
	ns_centerServerProxy.WWW_configReport( module_name_ .. '_dc4', config_str_ )

    return setmetatable(ins_, mt);
end



--释放所有锁(远程)
function _M.final( self )
	--释放locks
	if  ngx.ctx.resty_dc_locks_shm then
		for k, v in pairs( ngx.ctx.resty_dc_locks_shm ) do
			if  v and v.select then
				if  v.cc > 0 then
					local ok, err_ = self:send_unlock_kh_to_dc( k, v.select )  --远程解锁
					if  ok then
						if  v.cc > 1 then
							log.day_list_simple( "shm", "lock_rw", "unlock_ok2", ngx.worker.id(),
									uu.get_act_cmd(), k, v.cc, debug.traceback() );
						end
					else
						log.day_list_simple( "shm", "lock_rw", "error_unlock2", ngx.worker.id(),
								uu.get_act_cmd(), k, v.cc, err_, debug.traceback() );
					end
				else
					--log.day_list_simple( "shm", "lock_rw", "unlocked2", ngx.worker.id(),
							--uu.get_act_cmd(), k, v.cc, debug.traceback() );
				end
			else
				log.day_list_simple( "shm", "lock_rw", "error_locks2", ngx.worker.id(),
						uu.get_act_cmd(), k, v.cc, debug.traceback() );
			end
		end
		ngx.ctx.resty_dc_locks_shm = nil
	end

end


---解锁全部
function _M.unlock_all( self )
	self:final()
end



-------------------------------------------API-------------------------------
---解锁指定kh
function _M.unlock_key_hash( self, pp_ )
	--local pp_ = {
		--select = 1       --- 用来做十分表或者百分表的分表id，一般是uin
		--key    = 10001   --- key值，一般是uin
		--hash   = 1       --- 子分类hash 数字或者字符串
		--err_call_back    --- 错误的回调，可以为空
	--}
	self:send_unlock_kh_to_dc( pp_.key .. "_h" .. pp_.hash, pp_.select, pp_.err_call_back )
end



---批量读取 (只能针对某一个分库select，需要业务二次封装)
function _M.mread( self, pp_ )
	--local pp_ = {
		--select    = 1                   ---用来做十分表或者百分表的分表id，一般是uin
	    --kv_list_ :  { {key=1001, hash=2},{key=1002, hash=3},{key=1005, hash=3} }   ---kv的列表
		--stale     = 1,                  ---是否过期数据也要  1=过期也返回
		--err_call_back = function        ---出错时候的回调函数，可以为空
	--}

	log.debug( log.fast() and "call shmDcClient:mread" )
	local kv_list_ = pp_.kv_list

	local kv_list_t_ = {}
	for i=1, #kv_list_ do
		kv_list_t_[ #kv_list_t_ + 1 ] =  kv_list_[i].key
		kv_list_t_[ #kv_list_t_ + 1 ] =  kv_list_[i].hash
	end

	if  #kv_list_t_ <= 0 then
		return {}
	end

	local kv_list_str_ = table.concat( kv_list_t_, '-' )

	local pp_wrap_ = {
		func    = 'mread',
		select  = pp_.select,
		kv_list = kv_list_str_,
	}

	local ret_ = self:private_post_to_dc_server( pp_wrap_, pp_.err_call_back )
	local t_ = table.loadstring( ret_ ) or {}

	if  type(t_) == 'table' then
		for i=1, #t_ do
			if  t_[i] == "not_found" then
				t_[i] = ngx.null           --与原版本一致
			end
		end
	end

	return t_
end



---读取单个数据
function _M.read( self, pp_ )
	--local pp_ = {
		--select    = 1      ---用来做十分表或者百分表的分表id，一般是uin
		--key       = 10001
		--hash      = 2
		--stale     = 1,     ---是否过期数据也要
		--read_only = 1,     ---是否读的时候不加锁（速度稍快,读后再写会有错误日志）
		--err_call_back = function    ---出错时候的回调函数，可以为空
	--}

	pp_.func    = 'read'
	local key_hash_ = pp_.key .. "_h" .. pp_.hash

	if  ngx.ctx.__READ_ONLY__ == 1 then
		pp_.read_only = 1   --只读请求
	end

	if  pp_.read_only == 1 then
		--只读
	else
		--是否已经被锁住了，如果已经锁了，转为一个只读请求
		if  not ngx.ctx.resty_dc_locks_shm then
			ngx.ctx.resty_dc_locks_shm = {}
		end
		if  ngx.ctx.resty_dc_locks_shm[ key_hash_ ] and ngx.ctx.resty_dc_locks_shm[ key_hash_ ].cc > 0 then
			ngx.ctx.resty_dc_locks_shm[ key_hash_ ].cc = ngx.ctx.resty_dc_locks_shm[ key_hash_ ].cc + 1
			pp_.read_only = 1   --只读
			log.day_list_simple( "shm", "lock_rw", "relock_add_0", ngx.worker.id(), uu.get_act_cmd(), key_hash_,
					ngx.ctx.resty_dc_locks_shm[ key_hash_ ].cc, debug.traceback() );
		end
	end

	local ret_ = self:private_post_to_dc_server( pp_, pp_.err_call_back, 3000 )

	if  ret_ == '' then
		--返回空 (一般是获取锁失败) 重试
		if  ngx.ctx.__no_Content_Length__ then
			log.error( "private_post_to_dc_server retry " .. uu.get_act_cmd() ..  "\n" .. key_hash_ )
			ret_ = self:private_post_to_dc_server( pp_, pp_.err_call_back )
		else
			log.error( "private_post_to_dc_server has content-length, ignore retry " .. key_hash_ )
		end
	end

	if  ret_  and ( not (pp_.read_only == 1) ) then
		--记录key_hash, 被锁定了
		if  not ngx.ctx.resty_dc_locks_shm then
			ngx.ctx.resty_dc_locks_shm = {}
		end

		if  ngx.ctx.resty_dc_locks_shm[ key_hash_ ] then
			ngx.ctx.resty_dc_locks_shm[ key_hash_ ].cc = ngx.ctx.resty_dc_locks_shm[ key_hash_ ].cc + 1

			if  ngx.ctx.resty_dc_locks_shm[ key_hash_ ].cc > 1 then
				--重复锁定
				log.day_list_simple( "shm", "lock_rw", "relock_add_1", ngx.worker.id(), uu.get_act_cmd(), key_hash_,
						ngx.ctx.resty_dc_locks_shm[ key_hash_ ].cc, debug.traceback() );
			end

		else
			--新锁成功
			ngx.ctx.resty_dc_locks_shm [ key_hash_ ] = { cc=1, select=pp_.select }
			--log.day_list_simple( "shm", "lock_rw", "lock_ok",  ngx.worker.id(), uu.get_act_cmd(), 0, key_hash_ );
		end
	end

	if  ret_ == "not_found" then
		ret_ = ngx.null          --与原版本一致
	end

	return ret_
end



-----写入单个数据，并解开远程锁
function _M.save( self, pp_ )
	--local pp_ = {
		--select  = select_,          ---用来做十分表或者百分表的分表id，一般是uin
		--key     = key,
		--hash    = hash,
		--value   = value,
		--et      = et_,              ---超时时间 expire_time  300秒 600秒 3600秒
		--read_only  = read_only_,    ---是否不解远程锁 (请求结束时会统一解锁一次未解开的锁)
		--err_call_back = function    ---出错时候的回调函数，可以为空
	--}

	---超时时间判断
	---默认一天，最大值30天，超过30天最好数据落地
	if  not pp_.et then
		pp_.et = 86400
	end
	if  pp_.et <=0 or pp_.et > 86400*30 then
		pp_.et = 86400*30
	end

	pp_.func = "save"

	if  ngx.ctx.__READ_ONLY__ == 1 then
		--只读接口有写操作
		log.day_list_simple( "shm", "lock_rw", "error_save_where_read_only", ngx.worker.id(),
				uu.get_act_cmd(), pp_.key .. "_h" .. pp_.hash, debug.traceback() );
	end

	--写操作 判断一下锁
	local ret_ = self:private_post_to_dc_server( pp_, pp_.err_call_back, 3000 )
	if  ret_ then
		--试图解锁
		local key_hash_ = pp_.key .. "_h" .. pp_.hash
		if  pp_.read_only_ == 1 then
			--不用解锁 ( 或者作为主动参数传入，暂时不用解锁，等待final解锁 )
		elseif  ngx.ctx.resty_dc_locks_shm and ngx.ctx.resty_dc_locks_shm[ key_hash_ ] then
			if  ngx.ctx.resty_dc_locks_shm[ key_hash_ ].cc > 0 then
				ngx.ctx.resty_dc_locks_shm[ key_hash_ ].cc = 0   --解锁成功
				--log.day_list_simple( "shm", "lock_rw", "unlock_ok",
						--ngx.worker.id(), uu.get_act_cmd(), 0, key_hash_ );
			else
				--重复解锁 使用缓存连写两次
				--log.day_list_simple( "shm", "lock_rw", "warn_re_unlock",  ngx.worker.id(),
						--uu.get_act_cmd(), 0,     key_hash_, debug.traceback() );
			end
		else
			--找不到锁
			log.day_list_simple( "shm", "lock_rw", "error_not_find_lock", ngx.worker.id(),
					    uu.get_act_cmd(), 'nil', key_hash_, debug.traceback() );
		end
	end
	return ret_
end



--- 删除一个key下的一个hash
function _M.del( self, pp_ )
	--local pp_ = {
		--select  = select,
		--key     = key,
		--hash    = hash,
		--err_call_back
	--}
	pp_.func = 'del'
	return self:private_post_to_dc_server( pp_, pp_.err_call_back )
end




---------------------------------------------------private------------------
---发送请求到服务器 POST
_M.private_post_to_dc_server = function( self, pp_, err_call_back, p_time_exp_ )

	ngx.ctx.err_call_back = err_call_back;
	local url_ = self:private_get_url_by_select( pp_.select )

	self.carry_index = self.carry_index + 1
	local Carry_ = '' .. self.carry_index

	--http://120.24.64.132:8087/dc/?auth=Xcn67K&name=posting&cmd=comm&name=map&carry=111aaa
	url_ = url_ .. "&cmd=comm&carry=" .. Carry_ .. "&select=" .. pp_.select
	if  ngx.ctx.m_params.uin then
		url_ = url_ .. "&uin=" .. ngx.ctx.m_params.uin
	end

	if  pp_.err_call_back then
		pp_.err_call_back = nil   ---函数不用打包
	end

	local post_data_ = ngx.encode_args(pp_);
	local ret_, body_, resp_ = ns_network.postHttpPage( url_, post_data_, p_time_exp_ )

	if  log.fast() then
		local url_pos_ = string.find( url_, '?' )
		if  url_pos_ and url_pos_>0 then
			log.debug( "call private_post_to_dc_server: " .. string.sub( url_, 1, url_pos_-1 ) )
		else
			log.debug( "call private_post_to_dc_server: " .. url_ )
		end
		uu.var_dump( pp_ )
		log.debug( post_data_ )
		log.debug( "shm post ret=" .. ret_ .. ", body=" .. (body_ or 'nil') )
	end


	if  ret_ == 200 then
		--验证Carry
		if  resp_.headers.Carry == Carry_ then
			if  resp_.headers["Content-Length"] then
				if  resp_.headers["Content-Length"] ==  ('' .. #body_) then
					return body_   --ok
				else
					log.day_list_simple( "shm", "dc_client", "DataLen_check_error",
							resp_.headers["Content-Length"], #body_ )
				end
			else
				ngx.ctx.__no_Content_Length__ = 1
				return body_       --ok 没有Content-Length
			end
		else
			log.day_list_simple( "shm", "dc_client", "Carry_check_error", resp_.headers.Carry, #Carry_ )
		end
	else
		log.day_list_simple( "shm", "dc_client", "http_error", ret_, url_ )
	end

	self:error_exit( 'error select = ' .. (pp_.select or 'nil') );
end



---------------------------------------
---根据shm_name获得连接的url
_M.private_get_url_by_select = function( self, select_ )
	if  self.config_cache[ select_ ]  then
		--先查config cache
		log.debug( log.fast() and "find config_cache for " .. select_ );
		return  self.config_cache[ select_ ];
	else
		if  self.config_server and self.config_select then
			if  self.config_select[select_] and
				self.config_server[ self.config_select[select_] ]     and
				self.config_server[ self.config_select[select_] ].url then
				local ret_ = self.config_server[ self.config_select[select_] ].url;
				if  ret_ then
					---log.debug( log.fast() and "set config_cache for " .. select_ );
					self.config_cache[ select_ ] = ret_;
					return ret_;
				end
			end
		end
	end

	self:error_exit( 'error shm_name = ' .. (select_ or 'nil') );

end


---发送远程解锁请求，解锁一个key hash对
---(一般情况下read加锁，save解锁，或者 final() 统一解锁，很少这样去主动解锁)
function _M.send_unlock_kh_to_dc( self, key_hash_, select_, err_call_back )
	local pp_ = {
		func    = 'unlock_kh',
		select  = select_,
		kh      = key_hash_,
	}
	return self:private_post_to_dc_server( pp_, err_call_back, 3000 )
end


-- 出错处理
_M.error_exit = function(self, txt, trace_info_ )
	if  ngx.ctx.err_call_back then
		--log.error( "has set error_exit. " .. (txt or 'nil') );
		ngx.ctx.err_call_back(txt, trace_info_);
	else
		log.error( "not set error_exit. " .. (txt or 'nil'), trace_info_ );
	end
end



---------------------------------------------校验函数----------------------------
--写校验[_k_]
_M.private_check_set_key = function(self, select_, key, hash, value )
	if  value._k_ then
		if  ( tonumber(value._k_) and ( tonumber(key) == tonumber(value._k_) ) ) or (key == value._k_) then
			--ok
		else
			log.day_list_short( "shm", "error_key_check", "set", select_, key, value._k_, hash, debug.traceback() );
			log.error( "error_set_key_check" )
			return false;
		end
	else
		value._k_ = tonumber(key) or key;  --首次设置
	end

	if  value._h_ then
		if  ( tonumber(value._h_) and ( tonumber(hash) == tonumber(value._h_) ) ) or (hash == value._h_) then
			--ok
		else
			log.day_list_short( "shm", "error_hash_check", "set", select_, key, value._h_, hash );
			log.error( "error_set_hash_check" )
			return false;
		end
	else
		if  tonumber(hash) then
			value._h_ = tonumber(hash) --首次设置
		end
	end


	value._t_ = uu.now()    --最后一次写入时间
	return true;
end


--读校验[_k_]
_M.private_check_get_key = function(self, select_, key, hash, value )
	if  value._k_ then
		if  ( tonumber(value._k_) and ( tonumber(key) == tonumber(value._k_) ) ) or (key == value._k_) then
			--ok
		else
			--读取出错
			log.day_list_short( "shm", "error_key_check", "get", select_, key, value._k_, hash, debug.traceback() );
			log.error( "error_get_key_check" )
			return false;
		end
	end

	if  value._h_ then
		if  ( tonumber(value._h_) and ( tonumber(hash) == tonumber(value._h_) ) ) or (hash == value._h_) then
			--ok
		else
			--读取出错
			log.day_list_short( "shm", "error_hash_check", "get", select_, key, value._h_, hash );
			log.error( "error_get_hash_check" )
			return false;
		end
	end

	return true;
end



return _M;
